package com.niit.kafka;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;

//指定偏移量进行消费
public class ConsumerSeek {

    public static void main(String[] args) throws InterruptedException {
        //1.配置链接信息
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //如果想再次从指定的偏移量进行消费 要么修改组名，那么会马上立即从指定的偏移量进行消费。不修改组名需要一定延迟下才会继续消费
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"BD2_5");
        //--- 主题是新主题  指定从最早消费。。  如果是旧的 那就不指定从最早消费
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// --> 偏移量改为 0

        //2.创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

        //3.订阅主题
        consumer.subscribe(Arrays.asList("BD2_old_user"));

        //4.1指定位置进行消费    获得当前消费者 所消费的主题下所有分区信息
        Set<TopicPartition> assignment = consumer.assignment();
        //4.2保证分区分配方案指定完成
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
            assignment = consumer.assignment();
        }
        for (TopicPartition topicPartition :assignment) {
            consumer.seek(topicPartition,20); //--> 偏移量改成 20
        }

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String,String> record:records) {
                 System.out.println(record);
                Thread.sleep(5000);

            }
        }


    }
}
